package com.dongdongshop.mq;

import com.dongdongshop.service.SendMessageService;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
@RocketMQMessageListener(topic = "phoneNum", consumerGroup = "consumer_phoneNum")
public class SendMessageMQConsumer implements RocketMQListener<String[]>, RocketMQPushConsumerLifecycleListener {

    @Autowired
    private SendMessageService sendMessageService;

    @Override
    public void onMessage(String[] strings) {
        System.out.println("消费成功>>" + strings);
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //获取Message
                MessageExt messageExt = list.get(0);
                //获取信息id
                String msgId = messageExt.getMsgId();
                System.out.println("消息ID>>>" + msgId);
                //获取信息重试次数
                int reconsumeTimes = messageExt.getReconsumeTimes();
                System.out.println("消息重试次数>>>" + reconsumeTimes);
                try {
                    //获取消息内容
                    byte[] body = messageExt.getBody();
                    String message = new String(body);
                    System.out.println("消息内容>>>" + message);
                    System.out.println("消息主题>>>" + messageExt.getTopic());
                    sendMessageService.sendMessage(message);
                    System.out.println("消费者消费");
                }catch (Exception e){
                    e.printStackTrace();
                    if(reconsumeTimes > 2){
                        //重试次数大于2 进入死信队列
                        consumeConcurrentlyContext.setDelayLevelWhenNextConsume(-1);
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    //消费异常,进行重试
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                //消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
    }
}
